Assignment with Teams of Workers

There are many versions of the assignment problem, which have additional constraints on the workers or tasks. In the next example, six workers are divided into two teams, and each team can perform at most two tasks.

The following sections present a Python program that solves this problem using the CP-SAT or the MIP solver. For a solution using the min cost flow solver, see the section Assignment with teams.

First, let's take a look at the CP-SAT solution to the problem.

The following code imports the required library.

from ortools.sat.python import cp_model

The following code creates the data for the program.

costs = [
   
[90, 76, 75, 70],
   
[35, 85, 55, 65],
   
[125, 95, 90, 105],
   
[45, 110, 95, 115],
   
[60, 105, 80, 75],
   
[45, 65, 110, 95],
]
num_workers
= len(costs)
num_tasks
= len(costs[0])

team1
= [0, 2, 4]
team2
= [1, 3, 5]
# Maximum total of tasks for any team
team_max
= 2

The following code creates the model.

model = cp_model.CpModel()

The following code creates an array of variables for the problem.

x = {}
for worker in range(num_workers):
   
for task in range(num_tasks):
        x
[worker, task] = model.NewBoolVar(f'x[{worker},{task}]')

There is one variable for each pair of a worker and task. Note that the workers are numbered 0 - 5, while the tasks are numbered 0 - 3, unlike in the original example, in which all nodes had to be numbered differently, as required by the min cost flow solver.

The following code creates the constraints for the program.

# Each worker is assigned to at most one task.
for worker in range(num_workers):
    model
.AddAtMostOne(x[worker, task] for task in range(num_tasks))

# Each task is assigned to exactly one worker.
for task in range(num_tasks):
    model
.AddExactlyOne(x[worker, task] for worker in range(num_workers))

# Each team takes at most two tasks.
team1_tasks
= []
for worker in team1:
   
for task in range(num_tasks):
        team1_tasks
.append(x[worker, task])
model
.Add(sum(team1_tasks) <= team_max)

team2_tasks
= []
for worker in team2:
   
for task in range(num_tasks):
        team2_tasks
.append(x[worker, task])
model
.Add(sum(team2_tasks) <= team_max)

The following code creates the objective function.

objective_terms = []
for worker in range(num_workers):
   
for task in range(num_tasks):
        objective_terms
.append(costs[worker][task] * x[worker, task])
model
.Minimize(sum(objective_terms))

The value of the objective function is the total cost over all variables that are assigned the value 1 by the solver.

The following code invokes the solver and displays the results.

solver = cp_model.CpSolver()
status
= solver.Solve(model)

Now, we can print the solution.

if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
   
print(f'Total cost = {solver.ObjectiveValue()}\n')
   
for worker in range(num_workers):
       
for task in range(num_tasks):
           
if solver.BooleanValue(x[worker, task]):
               
print(f'Worker {worker} assigned to task {task}.' +
                      f
' Cost = {costs[worker][task]}')
else:
   
print('No solution found.')

Here's the output of the program.

Total cost:  250
Worker 0 assigned to task 2. Cost = 75
Worker 1 assigned to task 0. Cost = 35
Worker 4 assigned to task 3. Cost = 75
Worker 5 assigned to task 1. Cost = 65
Time =  6  milliseconds

Here is the entire program.

"""Solve a simple assignment problem."""
from ortools.sat.python import cp_model


def main():
   
# Data
    costs
= [
       
[90, 76, 75, 70],
       
[35, 85, 55, 65],
       
[125, 95, 90, 105],
       
[45, 110, 95, 115],
       
[60, 105, 80, 75],
       
[45, 65, 110, 95],
   
]
    num_workers
= len(costs)
    num_tasks
= len(costs[0])

    team1
= [0, 2, 4]
    team2
= [1, 3, 5]
   
# Maximum total of tasks for any team
    team_max
= 2

   
# Model
    model
= cp_model.CpModel()

   
# Variables
    x
= {}
   
for worker in range(num_workers):
       
for task in range(num_tasks):
            x
[worker, task] = model.NewBoolVar(f'x[{worker},{task}]')

   
# Constraints
   
# Each worker is assigned to at most one task.
   
for worker in range(num_workers):
        model
.AddAtMostOne(x[worker, task] for task in range(num_tasks))

   
# Each task is assigned to exactly one worker.
   
for task in range(num_tasks):
        model
.AddExactlyOne(x[worker, task] for worker in range(num_workers))

   
# Each team takes at most two tasks.
    team1_tasks
= []
   
for worker in team1:
       
for task in range(num_tasks):
            team1_tasks
.append(x[worker, task])
    model
.Add(sum(team1_tasks) <= team_max)

    team2_tasks
= []
   
for worker in team2:
       
for task in range(num_tasks):
            team2_tasks
.append(x[worker, task])
    model
.Add(sum(team2_tasks) <= team_max)

   
# Objective
    objective_terms
= []
   
for worker in range(num_workers):
       
for task in range(num_tasks):
            objective_terms
.append(costs[worker][task] * x[worker, task])
    model
.Minimize(sum(objective_terms))

   
# Solve
    solver
= cp_model.CpSolver()
    status
= solver.Solve(model)

   
# Print solution.
   
if status == cp_model.OPTIMAL or status == cp_model.FEASIBLE:
       
print(f'Total cost = {solver.ObjectiveValue()}\n')
       
for worker in range(num_workers):
           
for task in range(num_tasks):
               
if solver.BooleanValue(x[worker, task]):
                   
print(f'Worker {worker} assigned to task {task}.' +
                          f
' Cost = {costs[worker][task]}')
   
else:
       
print('No solution found.')


if __name__ == '__main__':
    main
()

Next, we describe a solution to this assignment problem using the MIP solver.

The following code imports the required library.

from ortools.linear_solver import pywraplp

The following code creates the data for the program.

costs = [
   
[90, 76, 75, 70],
   
[35, 85, 55, 65],
   
[125, 95, 90, 105],
   
[45, 110, 95, 115],
   
[60, 105, 80, 75],
   
[45, 65, 110, 95],
]
num_workers
= len(costs)
num_tasks
= len(costs[0])

team1
= [0, 2, 4]
team2
= [1, 3, 5]
# Maximum total of tasks for any team
team_max
= 2

The following code creates the solver.

# Create the mip solver with the SCIP backend.
solver
= pywraplp.Solver.CreateSolver('SCIP')
if not solver:
   
return

The following code creates an array of variables for the problem.

# x[i, j] is an array of 0-1 variables, which will be 1
# if worker i is assigned to task j.
x
= {}
for worker in range(num_workers):
   
for task in range(num_tasks):
        x
[worker, task] = solver.BoolVar(f'x[{worker},{task}]')

The following code creates the constraints for the program.

# Each worker is assigned at most 1 task.
for worker in range(num_workers):
    solver
.Add(
        solver
.Sum([x[worker, task] for task in range(num_tasks)]) <= 1)

# Each task is assigned to exactly one worker.
for task in range(num_tasks):
    solver
.Add(
        solver
.Sum([x[worker, task] for worker in range(num_workers)]) == 1)

# Each team takes at most two tasks.
team1_tasks
= []
for worker in team1:
   
for task in range(num_tasks):
        team1_tasks
.append(x[worker, task])
solver
.Add(solver.Sum(team1_tasks) <= team_max)

team2_tasks
= []
for worker in team2:
   
for task in range(num_tasks):
        team2_tasks
.append(x[worker, task])
solver
.Add(solver.Sum(team2_tasks) <= team_max)

The following code creates the objective function.

objective_terms = []
for worker in range(num_workers):
   
for task in range(num_tasks):
        objective_terms
.append(costs[worker][task] * x[worker, task])
solver
.Minimize(solver.Sum(objective_terms))

The following code invokes the solver and displays the results.

status = solver.Solve()

Now, we can print the solution.

if status == pywraplp.Solver.OPTIMAL or status == pywraplp.Solver.FEASIBLE:
   
print(f'Total cost = {solver.Objective().Value()}\n')
   
for worker in range(num_workers):
       
for task in range(num_tasks):
           
if x[worker, task].solution_value() > 0.5:
               
print(f'Worker {worker} assigned to task {task}.' +
                      f
' Cost = {costs[worker][task]}')
else:
   
print('No solution found.')
print(f'Time = {solver.WallTime()} ms')

Here is the output of the program.

Minimum cost assignment:  250.0
Worker 0 assigned to task 2. Cost = 75
Worker 1 assigned to task 0. Cost = 35
Worker 4 assigned to task 3. Cost = 75
Worker 5 assigned to task 1. Cost = 65
Time =  6  milliseconds

Here is the entire program.

"""MIP example that solves an assignment problem."""
from ortools.linear_solver import pywraplp


def main():
   
# Data
    costs
= [
       
[90, 76, 75, 70],
       
[35, 85, 55, 65],
       
[125, 95, 90, 105],
       
[45, 110, 95, 115],
       
[60, 105, 80, 75],
       
[45, 65, 110, 95],
   
]
    num_workers
= len(costs)
    num_tasks
= len(costs[0])

    team1
= [0, 2, 4]
    team2
= [1, 3, 5]
   
# Maximum total of tasks for any team
    team_max
= 2

   
# Solver
   
# Create the mip solver with the SCIP backend.
    solver
= pywraplp.Solver.CreateSolver('SCIP')
   
if not solver:
       
return

   
# Variables
   
# x[i, j] is an array of 0-1 variables, which will be 1
   
# if worker i is assigned to task j.
    x
= {}
   
for worker in range(num_workers):
       
for task in range(num_tasks):
            x
[worker, task] = solver.BoolVar(f'x[{worker},{task}]')

   
# Constraints
   
# Each worker is assigned at most 1 task.
   
for worker in range(num_workers):
        solver
.Add(
            solver
.Sum([x[worker, task] for task in range(num_tasks)]) <= 1)

   
# Each task is assigned to exactly one worker.
   
for task in range(num_tasks):
        solver
.Add(
            solver
.Sum([x[worker, task] for worker in range(num_workers)]) == 1)

   
# Each team takes at most two tasks.
    team1_tasks
= []
   
for worker in team1:
       
for task in range(num_tasks):
            team1_tasks
.append(x[worker, task])
    solver
.Add(solver.Sum(team1_tasks) <= team_max)

    team2_tasks
= []
   
for worker in team2:
       
for task in range(num_tasks):
            team2_tasks
.append(x[worker, task])
    solver
.Add(solver.Sum(team2_tasks) <= team_max)

   
# Objective
    objective_terms
= []
   
for worker in range(num_workers):
       
for task in range(num_tasks):
            objective_terms
.append(costs[worker][task] * x[worker, task])
    solver
.Minimize(solver.Sum(objective_terms))

   
# Solve
    status
= solver.Solve()

   
# Print solution.
   
if status == pywraplp.Solver.OPTIMAL or status == pywraplp.Solver.FEASIBLE:
       
print(f'Total cost = {solver.Objective().Value()}\n')
       
for worker in range(num_workers):
           
for task in range(num_tasks):
               
if x[worker, task].solution_value() > 0.5:
                   
print(f'Worker {worker} assigned to task {task}.' +
                          f
' Cost = {costs[worker][task]}')
   
else:
       
print('No solution found.')
   
print(f'Time = {solver.WallTime()} ms')


if __name__ == '__main__':
    main
()

This section describes an assignment problem in which each task has a size, which represents how much time or effort the task requires. The total size of the tasks performed by each worker has a fixed bound. We'll present Python programs that solve

Updated Jan 2, 2023

This section describes an assignment problem in which only certain allowed groups of workers can be assigned to the tasks. In the example there are twelve workers, numbered 0 - 11. The allowed groups are combinations of the following pairs of

Updated Jan 2, 2023

This section describes the linear sum assignment solver, a specialized solver for the simple assignment problem, which can be faster than either the MIP or CP-SAT solver. However, the MIP and CP-SAT solvers can handle a much wider array of problems,

Updated Jan 18, 2023